package com.deodar.kettle.platform.monitor.service;


import com.deodar.kettle.platform.common.App;
import com.deodar.kettle.platform.common.util.KettleConstant;
import com.deodar.kettle.platform.common.util.RepositoryUtils;
import com.deodar.kettle.platform.database.domain.RJob;
import com.deodar.kettle.platform.database.service.IRDirectoryService;
import com.deodar.kettle.platform.database.service.IRJobService;
import com.deodar.kettle.platform.monitor.executor.JobExecutor;
import com.deodar.kettle.platform.monitor.util.KettlePoolUtil;
import com.deodar.kettle.platform.monitor.util.TaskManageUtil;
import com.deodar.kettle.platform.parameter.service.IRnParameterService;
import lombok.extern.slf4j.Slf4j;
import org.pentaho.di.base.AbstractMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.Repository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import java.util.Map;

/**
 * @author: Administrator
 * @Date: 2019/12/8 15:58
 * @Description:
 * @version:1.0.0
 */
@Slf4j
@Service("jobService")
public class JobService {

    @Autowired
    @Qualifier("directoryService")
    IRDirectoryService directoryService;

    @Autowired
    @Qualifier("rJobService")
    IRJobService rJobService;

    @Autowired
    IRnParameterService bnParameterService;


    public void  delete(Integer jobId){
        Repository repository = App.getInstance().getRepository();
        try {
            repository.deleteJob(RepositoryUtils.getObjectId(jobId));
        } catch (KettleException e) {
            e.printStackTrace();
        }

    }


    public boolean start(Integer id){
        return startJob(id,null);
    }

    public boolean startJob(Integer id,String logId){
        RJob job = rJobService.selectRJobById(id);

        try{
            Repository repository = App.getInstance().getRepository();
            ObjectId objectId = RepositoryUtils.getObjectId(id);
            //new LongObjectId(Long.valueOf(String.valueOf(id)));
            JobMeta jobMeta = repository.loadJob(objectId,null);
            JobExecutor jobExecutor = new JobExecutor(decode(null,bnParameterService.selectMapByGroupId(job.getParGroupId())),jobMeta,logId);
            KettlePoolUtil.getInstance().run(jobExecutor);

        } catch (KettleException e) {
            e.printStackTrace();
        }


        return true;
    }


    public JobExecutionConfiguration decode(AbstractMeta meta, Map<String,String> mapVariables){

        JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
        if(mapVariables != null) {
            jobExecutionConfiguration.setVariables(mapVariables);
        }
        jobExecutionConfiguration.setExecutingRemotely(true);
        jobExecutionConfiguration.setRepository(App.getInstance().getRepository());
        return jobExecutionConfiguration;

    }



    public boolean stop(String logId){
        String key = KettleConstant.KEY_PREFIX+logId;
        JobExecutor transExecutor = (JobExecutor) TaskManageUtil.getInstance().get(key);
        if(transExecutor != null){
            transExecutor.stop();
            transExecutor.setClickStop(true);
            log.info("接收停止信号，开始停止job任务");
            return true;
        }
        return false;
    }


}
